Skip to content

feat: add executor pool support#687

Open
HuaHuaY wants to merge 1 commit into
apache:mainfrom
HuaHuaY:infra
Open

feat: add executor pool support#687
HuaHuaY wants to merge 1 commit into
apache:mainfrom
HuaHuaY:infra

Conversation

@HuaHuaY
Copy link
Copy Markdown
Contributor

@HuaHuaY HuaHuaY commented May 28, 2026

No description provided.

Copy link
Copy Markdown
Member

@wgtmac wgtmac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work. The abstraction is clean and the test coverage is solid. Since the default executor is nullopt, existing paths stay single-threaded, so the risk is well contained.

A couple of things before merge:

  • The PR has no description. Worth adding the motivation and a short design note, ideally with the parallel/serial numbers that justify it.
  • The RetryRunner change from a runtime fluent API (OnlyRetryOn/StopRetryOn) to the compile-time RetryRunner<Policy> is a breaking API change that's independent of executor support. Consider splitting it into its own PR so it gets reviewed on its own merits.

Left a few inline notes.


ExecutorTask executor_task(
[promise = std::move(promise), task = std::move(task)]() mutable {
promise.set_value(std::move(task)());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a task throws instead of returning a Status, the promise is never set and the exception escapes on the worker thread, while future.get() sees a broken promise. The repo uses Result/Status rather than exceptions, so this is unlikely in our own code, but Executor is a public extension point and users may plug in pools where it matters. Wrapping the call in try/catch and turning the exception into a Status would be safer.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Errors resulting from violating calling conventions are the user's responsibility, not the library's.

if (!executor_.has_value()) {
return internal::RunTasksSingleThreaded(std::move(tasks_));
}
return internal::RunTasksParallel(executor_->get(), std::move(tasks_));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Run() blocks on future.get() from the calling thread. That's fine for the current sequential call sites, but PlanWith is now public API. If someone drives a TaskGroup from a worker thread of the same bounded executor, it can deadlock (pool saturated while waiting on a task queued behind it). Worth documenting that the driving thread must not be one of the executor's own workers.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue of a thread pool's tasks spawning more tasks and then blocking themselves is a problem that I believe any thread pool with a thread limit will encounter, not just a problem with this particular PR.

Comment thread src/iceberg/manifest/manifest_group.cc Outdated
Copy link
Copy Markdown
Member

@wgtmac wgtmac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few more minor notes, non-blocking.

Comment thread src/iceberg/manifest/manifest_filter_manager.cc Outdated
Comment thread src/iceberg/update/snapshot_update.cc Outdated
Comment thread src/iceberg/util/retry_util.h
Copy link
Copy Markdown
Member

@wgtmac wgtmac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another pass, this time on the interface design and extensibility.

The overall shape is good: one virtual Executor that engines implement, threaded through the builders via PlanWith. The Arrow adapter test is a nice proof that an external pool drops in with basically one line, so "bring your own threadpool" is well covered.

On future async directions (C++23 coroutines, P2300 std::execution): the model here is synchronous and blocking, TaskGroup::Run() fans out and blocks on std::future::get(), and the planning APIs return Result<...> directly. So this is a parallel-for primitive, not a step toward a sender/receiver pipeline. That's a reasonable scope for now, I'd just flag it explicitly so nobody expects this interface to extend into async later, it'll be a separate one. Details inline.

virtual ~Executor() = default;

/// \brief Schedule a task for execution.
virtual Status Submit(ExecutorTask task) = 0;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a fire-and-forget execute-style primitive (closer to the abandoned P0443 executor.execute than to P2300's scheduler/sender). Completion is tracked outside, via the std::promise/future plumbing in RunTasksParallel. Fine for a blocking parallel-for, but it doesn't lay groundwork for coroutines or std::execution: those need the executor to hand back something awaitable/composable, and the planning APIs (PlanFiles() -> Result<...>) are synchronous anyway. Going async later would be a separate interface, not an extension of this one. Worth stating in the header that Executor is a parallel-dispatch primitive, not an async scheduler.

Separately: ExecutorTask being move-only is the right call and matches Arrow, but pools whose submit takes a copyable std::function will need std::move_only_function or a small shim to adapt.

virtual Status Submit(ExecutorTask task) = 0;
};

using OptionalExecutor = std::optional<std::reference_wrapper<Executor>>;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

std::optional<std::reference_wrapper<Executor>> is a little awkward to use (executor_->get() at the call sites). A plain Executor* expresses "nullable, non-owning borrow" just as well and reads cleaner everywhere it's passed. Minor.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Holding a pointer requires a check to determine whether it needs to be destructed, which is detrimental to future code maintainability.

futures.reserve(tasks.size());

std::vector<Error> errors;
for (auto& task : tasks) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All tasks are submitted up front, each with its own promise/future. For a handful of manifests that's fine, but as a general primitive there's no concurrency bound and no fail-fast: N tasks always queue N and allocate N futures, and if one fails early the rest still run to completion. Probably out of scope for this PR, but worth a note if engines may push large fan-outs through here.

Comment thread src/iceberg/update/snapshot_update.h Outdated
Copy link
Copy Markdown
Member

@wgtmac wgtmac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One more pass. The main thing I found is a documentation gap around a new concurrency contract that now leaks onto user-supplied callbacks.

Once an executor is set, the user's ManifestWriterFactory and the shared FileIO get called from multiple worker threads. The tests already account for this (the factories use an atomic counter plus a barrier), so the requirement is understood, it's just not written down anywhere a downstream engine would see it. Worth documenting on the public surface.

Comment thread src/iceberg/manifest/manifest_merge_manager.h Outdated
Comment thread src/iceberg/delete_file_index.cc Outdated
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants